package com.boarsoft.rpc.core;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.proxy.InvocationHandler;

import com.boarsoft.common.Util;
import com.boarsoft.common.util.StreamUtil;
import com.boarsoft.common.util.StringUtil;
import com.boarsoft.rpc.RpcConfig;
import com.boarsoft.rpc.bean.RpcCall;
import com.boarsoft.rpc.bean.RpcInvoking;
import com.boarsoft.rpc.bean.RpcMethodConfig;
import com.boarsoft.rpc.bean.RpcReferenceConfig;
import com.boarsoft.rpc.bean.RpcRegistry;
import com.boarsoft.rpc.bean.RpcServiceConfig;
import com.boarsoft.rpc.serialize.RpcSerializer;
import com.boarsoft.rpc.spy.RpcSvcSpy;
import com.boarsoft.soagov.config.SvcConfig;
import com.boarsoft.soagov.spy.SpyData;

public class RpcInvoker implements InvocationHandler {
	private static final Logger log = LoggerFactory.getLogger(RpcInvoker.class);

	protected RpcCore rpcCore;
	protected RpcContext rpcContext;
	protected RpcReferenceConfig referenceConfig;
	/** RPC调用模拟器 */
	protected Object mocker;
	/** 是否开启模拟器 */
	protected RpcSvcSpy svcSpy;

	public String toString() {
		return this.referenceConfig.toString();
	}

	public RpcInvoker(RpcReferenceConfig referenceConfig, RpcCore rpcCore, RpcContext rpcContext)
			throws ClassNotFoundException {
		this.referenceConfig = referenceConfig;
		this.rpcCore = rpcCore;
		this.rpcContext = rpcContext;
		if (rpcContext.containsBean("rpcSvcSpy")) {
			this.svcSpy = rpcContext.getBean("rpcSvcSpy", RpcSvcSpy.class);
		}
		//
		String mn = referenceConfig.getMocker();
		if (StringUtil.isEmpty(mn)) {
			return;
		}
		if (rpcContext.containsBean(mn)) {
			mocker = rpcContext.getBean(mn);
		}
	}

	/**
	 * 远程引用本地代理对象被调用时会触发此方法
	 */
	@Override
	public Object invoke(Object caller, Method method, Object[] args) throws Throwable {
		// log.debug("Intercept invocation {} -> {}", caller, method);
		RpcMethodConfig mc = this.referenceConfig.getMethodConfig(method);
		if (mc == null) {
			// 如果没有找到相应的服务方法配置，表示这只是一个普通的本地方法
			try {
				// 此处不能传caller，会导致造成方法调用的反复拦截，形成死循环（递归）调用
				return method.invoke(this, args);
			} catch (Exception e) {
				log.error("Can not find reference config of method {} on caller {}", method, caller.getClass().getName());
				throw new IllegalStateException("Only specified interface method can be invoked remotely.");
			}
		}
		return this.invoke(mc, args, method);
	}

	public Object invoke(RpcMethodConfig rmc, Object[] args, Method method) throws Throwable {
		// 一旦开始停机，所有调用在发起时就报错
		if (!RpcCore.isRunning()) {
			throw new RejectedExecutionException("I'm closing");
		}
		// 服务治理埋点，如果svcSpy不为空，且服务状态为关闭，则尝试使用mocker
		if (svcSpy != null) {
			String key = rmc.getKey();
			SpyData sd = svcSpy.getSpyData(key);
			int status = svcSpy.checkStatus(sd);
			// 检查服务的开关状态，与是否开启了结果模拟
			if (status == SvcConfig.STATUS_DISABLE) {
				// 如果服务被关闭，直接抛出拒绝执行异常
				throw new RejectedExecutionException(key);
			} else if (status == SvcConfig.STATUS_MOCKING) {
				// 如果开启了结果模拟功能，通过服务治理插件调用模拟器，返回模拟的结果
				return svcSpy.mock(rmc, mocker, key, method, args);
			}
		}
		// 上面的服务治理检查通过后，才直接开始发送RPC请求
		// 根据方法调用类型配置，确定处理方式
		switch (rmc.getType()) {
		case RpcMethodConfig.TYPE_SYNC_BROADCAST: // 同步广播
		case RpcMethodConfig.TYPE_ASYNC_BROADCAST: // 异步广播
		case RpcMethodConfig.TYPE_BROADCAST_NOTICE: // 广播通知
			return this.broadcast(rmc, args, method);
		case RpcMethodConfig.TYPE_ASYNC_CALL: // 异步方法调用
		case RpcMethodConfig.TYPE_SYNC_CALL: // 同步方法调用
		case RpcMethodConfig.TYPE_SYNC_NOTICE: // 同步通知
		case RpcMethodConfig.TYPE_ASYNC_NOTICE: // 异步通知
			break;
		default:
			throw new IllegalStateException(String.format("Unknow method type %d", rmc.getType()));
		}
		// 开始执行调用
		int i = 0;
		String sign = referenceConfig.getSign();
		// 根据远程服务方法引用的key查找相应的服务提供者，并从它们中随机选取 一个
		String addr = null;
		while (true) {
			addr = this.rpcContext.getProvider(referenceConfig, rmc, args, method);
			// 检查是否有获取到地址
			if (StringUtil.isEmpty(addr)) {
				// 看看自己是不是也提供这个服务（自己调自己）
				RpcRegistry mr = this.rpcContext.getMyRegistry();
				RpcServiceConfig sc = mr.getServiceConfig(sign);
				if (sc != null) {
					return method.invoke(sc.getRefBean(), args);
				}
				// 如果没有相应的服务提供者，且没有配置模拟器并开启自动模拟，直接抛出异常
				String s = "No provider available for ".concat(sign);
				if (mocker == null || svcSpy == null || !rmc.isAutoMock()) {
					throw new IllegalStateException(s);
				}
				log.warn("{}, call mocker {} instead", s, rmc.getMocker());
				return svcSpy.mock(rmc, mocker, sign, method, args);
			}
			// 检查地址是否有效，这里会清除无效的连接
			if (rpcCore.checkLink(addr)) {
				break;
			}
		}
		// 失败重试（失败转移）
		do {
			try {
				// 根据reference.uri配置决定走HTTP调用还RPC调用
				if (StringUtil.isNotEmpty(referenceConfig.getUri())) {
					return this.httpInvoke(addr, rmc, args);
				}
				// 泛化调用时需从服务提供者的注册表获取relativeId
				if (rmc.getRelativeId() == null) {
					RpcRegistry rr = rpcContext.getRegistry(addr);
					if (rr != null) {
						RpcServiceConfig sc = rr.getServiceConfig(sign);
						log.debug("Search service methods of {} = {}", addr, sc.getMethodConfigMap());
						RpcMethodConfig smc = sc.getMethodConfig(rmc.getSign());
						rmc.setRelativeId(smc.getRelativeId());
					}
				}
				return this.rpcInvoke(addr, rmc, args);
			} catch (RejectedExecutionException e) {
				log.warn("RPC invoking {} be rejected by {}", sign, addr);
			} catch (Throwable e) {
				log.error("Invoke {} failed at {}/{}", rmc, i, rmc.getFailover(), e);
			}
			i++;
		} while (i < rmc.getFailover());
		// 三次调用失败后，尝试模拟器（如果有配置）
		String s = new StringBuilder("Invoke ").append(rmc)//
				.append(" failed after failover ").append(rmc.getFailover()).toString();
		if (mocker == null || svcSpy == null || !rmc.isAutoMock()) {
			throw new IllegalStateException(s);
		}
		log.warn("{}, call mocker {} instead", s, rmc.getMocker());
		return svcSpy.mock(rmc, mocker, sign, method, args);
	}

	@SuppressWarnings("unchecked")
	protected Map<String, Object> broadcast(RpcMethodConfig mc, Object[] args, Method method) throws Throwable {
		// 声明一个map来装每个服务提供者的返回值
		Map<String, Object> rm = new HashMap<String, Object>();
		// 根据服务的key，获取此服务已知的服务提供者
		List<RpcRegistry> addrLt = rpcContext.getProviders(this.referenceConfig, mc, args, method);
		// 找不到服务提供者，则打印一个警告，并返回空map
		if (addrLt == null) {
			log.warn("No provider for method {} of referece {}", mc, this.referenceConfig);
			return rm;
		}
		// 遍历所有服务提供者，获取到这些节点的连接，然后调用其方法
		for (RpcRegistry rr : addrLt) {
			String addr = rr.getKey();
			log.info("Broadcast {} to {}", mc, addr);
			// 不管是同步广播还是异步广播，总是以异步方式处理，返回Future
			RpcLink lo = null;
			try {
				lo = this.rpcCore.link2(addr);
			} catch (Exception e) {
				log.info("Can not get link of {}", addr, e);
			}
			if (lo != null && lo.isConnected()) {
				Future<Object> ro = null;
				try {
					// 不管是同步广播还是异步广播，总是以异步方式处理，返回Future
					if (StringUtil.isEmpty(referenceConfig.getUri())) {
						ro = (Future<Object>) lo.invoke(mc, args);
					} else {
						ro = rpcCore.getThreadPool().submit(new Callable<Object>() {
							@Override
							public Object call() throws Exception {
								try {
									RpcInvoker.this.httpInvoke(addr, mc, args);
								} catch (Throwable e) {
									log.error("Error on do http invoke {}/{}", addr, mc, e);
								}
								return null;
							}
						});
					}
					// 先将获得的Future对象放到map中
					rm.put(addr, ro);
				} catch (Throwable e) {
					log.error("Error on invoke method {} of {}", mc, addr, e);
				}
			}
		}
		// 如果是异步广播，直接向调用者返回装满Future对象的map即可
		if (mc.getType() == RpcMethodConfig.TYPE_ASYNC_BROADCAST) {
			return rm;
		}
		// 如果是同步广播，则需要调用这些Future对象的get方法，并用得到的结果替换map中的Future对象
		long start = System.currentTimeMillis();
		long timeout = 0L;
		for (Entry<String, Object> en : rm.entrySet()) {
			String k = en.getKey();
			// 用总的超时时间，减去已逝去的时间，计算出还可以等待的时间
			Future<Object> ft = (Future<Object>) en.getValue();
			timeout = System.currentTimeMillis() - start;
			timeout = mc.getTimeout() - timeout;
			// timeout<=0 表示不等待，没取到就抛出TimeoutException
			// timeout = Math.max(0L, timeout);
			// 这里得到的对象可能是真实的结果，也可能是一个异常或者null
			Object v = ft.get(timeout, TimeUnit.MILLISECONDS);
			rm.put(k, v); // 替换现有的Future对象
		}
		return rm;
	}

	public Object rpcInvoke(String addr, RpcMethodConfig mc, Object[] args) throws Throwable {
		// 获取到服务提供者的逻辑连接，并进行建立实际的socket连接，但不提交注册表
		RpcLink lo = null;
		try {
			lo = this.rpcCore.link2(addr);
		} catch (Exception e) {
			log.info("Can not get link of {}", addr, e);
		}
		// 如果连接可用，则在此连接上发送RPC调用请求
		if (lo != null && lo.isConnected()) {
			return lo.invoke(mc, args);
		}
		// 如果这个节点连不通，应同时从providerMap和linkMap中移除
		this.rpcCore.removeLink(addr, true, "it is broken");
		throw new IllegalStateException(new StringBuilder()//
				.append("Link ").append(addr).append(" is ")//
				.append(lo == null ? "null" : "disconnected").toString());
	}

	public Object httpInvoke(String addr, RpcMethodConfig mc, Object[] args) throws Throwable {
		RpcRegistry reg = rpcContext.getRegistry(addr);
		RpcInvoking ri = rpcContext.newInvoking(addr, mc, args);
		// RpcCall rc = ri.getRpcCall();
		String[] aa = addr.split(":");
		aa[1] = reg.getMetaString("rpc.http.port", null);
		if (StringUtil.isEmpty(aa[1])) {
			throw new IllegalStateException(new StringBuilder("Remote node ")//
					.append(addr).append(" has no http port config").toString());
		}
		String url = new StringBuilder("http://")//
				.append(Util.array2str(aa, ":"))//
				.append(mc.getUri()).append("?port=")//
				.append(RpcConfig.getPort()).toString();
		byte[] ba = RpcSerializer.serialize(ri.getRpcCall());
		BufferedOutputStream bos = null;
		BufferedInputStream bis = null;
		HttpURLConnection conn = null;
		try {
			conn = (HttpURLConnection) new URL(url).openConnection();
			conn.setConnectTimeout(RpcConfig.CONNECT_TIMEOUT);
			conn.setReadTimeout(RpcConfig.READ_TIMEOUT);
			conn.setRequestMethod("POST");
			conn.setDoOutput(true);
			conn.setDoInput(true);
			// headers
			conn.setRequestProperty("User-Agent", "Mozilla/5.0");
			conn.setRequestProperty("Content-Type", "application/octet-stream");
			conn.setRequestProperty("Content-Length", String.valueOf(ba.length));
			conn.setRequestProperty("service", mc.getFaceConfig().toString());
			conn.setRequestProperty("method", mc.getSign());
			// 发送数据
			bos = new BufferedOutputStream(conn.getOutputStream());
			bos.write(ba);
			bos.flush();
			// 判断响应码
			if (conn.getResponseCode() != 200) {
				throw new IllegalStateException(new StringBuilder()//
						.append("Response code = ")//
						.append(conn.getResponseCode()).toString());
			}
			// 读取返回
			bis = new BufferedInputStream(conn.getInputStream());
			int len = conn.getContentLength();
			if (len > 0) {
				ba = new byte[len];
				int read = 0, r = -1;
				while (read < len && (r = bis.read(ba, read, len)) > -1) {
					read += Math.max(0, r);
				}
				if (read == len) {
					RpcCall co = RpcSerializer.deserialize(ba);
					// 缓存时保存了RpcCall，这个对象才是原始的RPC请求对象，co仅用于传输
					// rc.setResult(co.getResult());
					// rc.setThrowable(co.getThrowable());
					if (co.getThrowable() != null) {
						throw co.getThrowable();
					}
					return co.getResult();
				}
			}
		} finally {
			// 先移除调用信息
			rpcContext.removeInvoking(ri);
			// ri.complete(); // 通知发起调用线程的继续
			StreamUtil.close(bis);
			StreamUtil.close(bos);
			if (conn != null) {
				conn.disconnect();
			}
		}
		return null;
	}

	/**
	 * 允许直接设置和读取Mocker
	 * 
	 * @param mocker
	 */
	public void setMocker(Object mocker) {
		this.mocker = mocker;
	}

	/**
	 * 允许直接设置和读取Mocker
	 * 
	 * @return
	 */
	public Object getMocker() {
		return mocker;
	}

	public RpcSvcSpy getSvcSpy() {
		return svcSpy;
	}

	public void setSvcSpy(RpcSvcSpy svcSpy) {
		this.svcSpy = svcSpy;
	}

}
